{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Name\n",
    "\n",
    "Gather training data by querying BigQuery \n",
    "\n",
    "\n",
    "# Labels\n",
    "\n",
    "GCP, BigQuery, Kubeflow, Pipeline\n",
    "\n",
    "\n",
    "# Summary\n",
    "\n",
    "A Kubeflow Pipeline component to submit a query to BigQuery and store the result in a Cloud Storage bucket.\n",
    "\n",
    "\n",
    "# Details\n",
    "\n",
    "\n",
    "## Intended use\n",
    "\n",
    "Use this Kubeflow component to:\n",
    "*   Select training data by submitting a query to BigQuery.\n",
    "*   Output the training data into a Cloud Storage bucket as CSV files.\n",
    "\n",
    "\n",
    "## Runtime arguments:\n",
    "\n",
    "\n",
    "| Argument | Description | Optional | Data type | Accepted values | Default |\n",
    "|----------|-------------|----------|-----------|-----------------|---------|\n",
    "| query | The query used by BigQuery to fetch the results. | No | String |  |  |\n",
    "| project_id | The project ID of the Google Cloud Platform (GCP) project to use to execute the query. | No | GCPProjectID |  |  |\n",
    "| dataset_id | The ID of the persistent BigQuery dataset to store the results of the query. If the dataset does not exist, the operation will create a new one. | Yes | String |  | None |\n",
    "| table_id | The ID of the BigQuery table to store the results of the query. If the table ID is absent, the operation will generate a random ID for the table. | Yes | String |  | None |\n",
    "| output_gcs_path | The path to the Cloud Storage bucket to store the query output. | Yes | GCSPath |  | None |\n",
    "| dataset_location | The location where the dataset is created. Defaults to US. | Yes | String |  | US |\n",
    "| job_config | The full configuration specification for the query job. See [QueryJobConfig](https://googleapis.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.job.QueryJobConfig.html#google.cloud.bigquery.job.QueryJobConfig) for details. | Yes | Dict | A JSONobject which has the same structure as [QueryJobConfig](https://googleapis.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.job.QueryJobConfig.html#google.cloud.bigquery.job.QueryJobConfig) | None |\n",
    "## Input data schema\n",
    "\n",
    "The input data is a BigQuery job containing a query that pulls data f rom various sources. \n",
    "\n",
    "\n",
    "## Output:\n",
    "\n",
    "Name | Description | Type\n",
    ":--- | :---------- | :---\n",
    "output_gcs_path | The path to the Cloud Storage bucket containing the query output in CSV format. | GCSPath\n",
    "\n",
    "## Cautions & requirements\n",
    "\n",
    "To use the component, the following requirements must be met:\n",
    "\n",
    "*   The BigQuery API is enabled.\n",
    "*   The component can authenticate to use GCP APIs. Refer to [Authenticating Pipelines to GCP](https://www.kubeflow.org/docs/gke/authentication-pipelines/) for details.\n",
    "*   The Kubeflow user service account is a member of the `roles/bigquery.admin` role of the project.\n",
    "*   The Kubeflow user service account is a member of the `roles/storage.objectCreator `role of the Cloud Storage output bucket.\n",
    "\n",
    "## Detailed description\n",
    "This Kubeflow Pipeline component is used to:\n",
    "*   Submit a query to BigQuery.\n",
    "    *   The query results are persisted in a dataset table in BigQuery.\n",
    "    *   An extract job is created in BigQuery to extract the data from the dataset table and output it to a Cloud Storage bucket as CSV files.\n",
    "\n",
    "    Use the code below as an example of how to run your BigQuery job.\n",
    "\n",
    "### Sample\n",
    "\n",
    "Note: The following sample code works in an IPython notebook or directly in Python code.\n",
    "\n",
    "#### Set sample parameters"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%capture --no-stderr\n",
    "\n",
    "!pip3 install kfp --upgrade"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "2. Load the component using KFP SDK"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp.components as comp\n",
    "\n",
    "bigquery_query_op = comp.load_component_from_url(\n",
    "    'https://raw.githubusercontent.com/kubeflow/pipelines/01a23ae8672d3b18e88adf3036071496aca3552d/components/gcp/bigquery/query/component.yaml')\n",
    "help(bigquery_query_op)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Sample\n",
    "\n",
    "Note: The following sample code works in IPython notebook or directly in Python code.\n",
    "\n",
    "In this sample, we send a query to get the top questions from stackdriver public data and output the data to a Cloud Storage bucket. Here is the query:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "QUERY = 'SELECT * FROM `bigquery-public-data.stackoverflow.posts_questions` LIMIT 10'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Set sample parameters"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "tags": [
     "parameters"
    ]
   },
   "outputs": [],
   "source": [
    "# Required Parameters\n",
    "PROJECT_ID = '<Please put your project ID here>'\n",
    "GCS_WORKING_DIR = 'gs://<Please put your GCS path here>' # No ending slash"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Optional Parameters\n",
    "EXPERIMENT_NAME = 'Bigquery -Query'\n",
    "OUTPUT_PATH = '{}/bigquery/query/questions.csv'.format(GCS_WORKING_DIR)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Run the component as a single pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp.dsl as dsl\n",
    "import json\n",
    "@dsl.pipeline(\n",
    "    name='Bigquery query pipeline',\n",
    "    description='Bigquery query pipeline'\n",
    ")\n",
    "def pipeline(\n",
    "    query=QUERY, \n",
    "    project_id = PROJECT_ID, \n",
    "    dataset_id='', \n",
    "    table_id='', \n",
    "    output_gcs_path=OUTPUT_PATH, \n",
    "    dataset_location='US', \n",
    "    job_config=''\n",
    "):\n",
    "    bigquery_query_op(\n",
    "        query=query, \n",
    "        project_id=project_id, \n",
    "        dataset_id=dataset_id, \n",
    "        table_id=table_id, \n",
    "        output_gcs_path=output_gcs_path, \n",
    "        dataset_location=dataset_location, \n",
    "        job_config=job_config)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Compile the pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline_func = pipeline\n",
    "pipeline_filename = pipeline_func.__name__ + '.zip'\n",
    "import kfp.compiler as compiler\n",
    "compiler.Compiler().compile(pipeline_func, pipeline_filename)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Submit the pipeline for execution"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#Specify pipeline argument values\n",
    "arguments = {}\n",
    "\n",
    "#Get or create an experiment and submit a pipeline run\n",
    "import kfp\n",
    "client = kfp.Client()\n",
    "experiment = client.create_experiment(EXPERIMENT_NAME)\n",
    "\n",
    "#Submit a pipeline run\n",
    "run_name = pipeline_func.__name__ + ' run'\n",
    "run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Inspect the output"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gsutil cat $OUTPUT_PATH"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## References\n",
    "* [Component python code](https://github.com/kubeflow/pipelines/blob/master/components/gcp/container/component_sdk/python/kfp_component/google/bigquery/_query.py)\n",
    "* [Component docker file](https://github.com/kubeflow/pipelines/blob/master/components/gcp/container/Dockerfile)\n",
    "* [Sample notebook](https://github.com/kubeflow/pipelines/blob/master/components/gcp/bigquery/query/sample.ipynb)\n",
    "* [BigQuery query REST API](https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query)\n",
    "\n",
    "## License\n",
    "By deploying or using this software you agree to comply with the [AI Hub Terms of Service](https://aihub.cloud.google.com/u/0/aihub-tos) and the [Google APIs Terms of Service](https://developers.google.com/terms/). To the extent of a direct conflict of terms, the AI Hub Terms of Service will control."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}